169914d49aa5dfc562fb399ccb29c9b946120c18,ninio-http/src/main/java/com/davfx/ninio/http/WebsocketSocket.java,WebsocketSocket,WebsocketSocket,#HttpClient#String#Address#Connecting#Closing#Failing#Receiver#Buffering#,120
Before Change
connecting.connected(this, connectAddress);
}
sender = httpClient.request()
.failing(failing)
.buffering(buffering)
.receiving(new HttpReceiver() {
private boolean opcodeRead = false;
private int currentOpcode;
private boolean lenRead = false;
private boolean mustReadExtendedLen16;
private boolean mustReadExtendedLen64;
private long currentLen;
private long currentRead;
private boolean mustReadMask;
private ByteBuffer currentExtendedLenBuffer;
private byte[] currentMask;
private ByteBuffer currentMaskBuffer;
private int currentPosInMask;
private long toPing = 0L;
@Override
public HttpContentReceiver received(Disconnectable disconnectable, HttpResponse response) {
// We should check everything here (status code, header Sec-WebSocket-Accept, ...)
if (response.status != 101) {
if (failing != null) {
failing.failed(new IOException("Could not connect to " + connectAddress + " [" + response.status + " " + response.reason + "]"));
}
return null;
}
return new HttpContentReceiver() {
@Override
public void received(ByteBuffer buffer) {
while (buffer.hasRemaining()) {
if (!opcodeRead && buffer.hasRemaining()) {
int v = buffer.get() & 0xFF;
if ((v & 0x80) != 0x80) {
LOGGER.error("Current implementation handles only FIN packets");
sender.cancel();
if (closing != null) {
closing.closed();
}
return;
}
currentOpcode = v & 0x0F;
opcodeRead = true;
}
if (!lenRead && buffer.hasRemaining()) {
int v = buffer.get() & 0xFF;
int len = v & 0x7F;
if (len <= 125) {
currentLen = len;
mustReadExtendedLen16 = false;
mustReadExtendedLen64 = false;
} else if (len == 126) {
mustReadExtendedLen16 = true;
mustReadExtendedLen64 = false;
currentExtendedLenBuffer = ByteBuffer.allocate(2);
} else {
mustReadExtendedLen64 = true;
mustReadExtendedLen16 = false;
currentExtendedLenBuffer = ByteBuffer.allocate(8);
}
mustReadMask = ((v & 0x80) == 0x80);
if (mustReadMask) {
currentMask = new byte[4];
currentMaskBuffer = ByteBuffer.wrap(currentMask);
currentPosInMask = 0;
}
lenRead = true;
}
while (mustReadExtendedLen16 && buffer.hasRemaining()) {
int v = buffer.get();
currentExtendedLenBuffer.put((byte) v);
if (currentExtendedLenBuffer.position() == 2) {
currentExtendedLenBuffer.flip();
currentLen = currentExtendedLenBuffer.getShort() & 0xFFFF;
mustReadExtendedLen16 = false;
currentExtendedLenBuffer = null;
}
}
while (mustReadExtendedLen64 && buffer.hasRemaining()) {
int v = buffer.get();
currentExtendedLenBuffer.put((byte) v);
if (currentExtendedLenBuffer.position() == 8) {
currentExtendedLenBuffer.flip();
currentLen = currentExtendedLenBuffer.getLong();
mustReadExtendedLen64 = false;
currentExtendedLenBuffer = null;
}
}
while (mustReadMask && buffer.hasRemaining()) {
int v = buffer.get();
currentMaskBuffer.put((byte) v);
if (currentMaskBuffer.position() == 4) {
currentMaskBuffer = null;
mustReadMask = false;
}
}
if (opcodeRead && lenRead && !mustReadExtendedLen16 && !mustReadExtendedLen64 && !mustReadMask && buffer.hasRemaining() && (currentRead < currentLen)) {
ByteBuffer partialBuffer;
int len = (int) Math.min(buffer.remaining(), currentLen - currentRead);
if (currentMask == null) {
partialBuffer = buffer.duplicate();
partialBuffer.limit(partialBuffer.position() + len);
buffer.position(buffer.position() + len);
currentRead += len;
} else {
partialBuffer = ByteBuffer.allocate(len);
while (buffer.hasRemaining() && (currentRead < currentLen)) {
int v = buffer.get() & 0xFF;
v ^= currentMask[currentPosInMask];
partialBuffer.put((byte) v);
currentRead++;
currentPosInMask = (currentPosInMask + 1) % 4;
}
partialBuffer.flip();
}
int opcode = currentOpcode;
long frameLength = currentLen;
if (currentRead == currentLen) {
opcodeRead = false;
lenRead = false;
mustReadExtendedLen16 = false;
mustReadExtendedLen64 = false;
currentExtendedLenBuffer = null;
mustReadMask = false;
currentMaskBuffer = null;
currentMask = null;
currentRead = 0L;
}
if (opcode == 0x09) {
if (toPing == 0L) {
toPing = frameLength;
sender.send(WebsocketUtils.headerOf(0x0A, frameLength));
}
toPing -= partialBuffer.remaining();
sender.send(partialBuffer);
} else if ((opcode == 0x01) || (opcode == 0x02)) {
if (receiver != null) {
receiver.received(WebsocketSocket.this, null, partialBuffer);
}
} else if (opcode == 0x08) {
LOGGER.debug("Connection closed by peer");
sender.cancel();
if (closing != null) {
closing.closed();
}
break;
}
}
}
}
@Override
public void ended() {
LOGGER.debug("Connection abruptly closed by peer");
sender.cancel();
if (closing != null) {
closing.closed();
}
}
};
}
})
.build(request);
}
@Override
After Change
.build()
);
HttpRequestBuilder b = httpClient.request();
sender = b.build(request);
b.receive(new HttpReceiver() {
private boolean opcodeRead = false;
private int currentOpcode;
private boolean lenRead = false;
private boolean mustReadExtendedLen16;
private boolean mustReadExtendedLen64;
private long currentLen;
private long currentRead;
private boolean mustReadMask;
private ByteBuffer currentExtendedLenBuffer;
private byte[] currentMask;
private ByteBuffer currentMaskBuffer;
private int currentPosInMask;
private long toPing = 0L;
@Override
public HttpContentReceiver received(final HttpResponse response) {
// We should check everything here (status code, header Sec-WebSocket-Accept, ...)
if (response.status != 101) {
queue.execute(new Runnable() {
@Override
public void run() {
if (closed) {
return;
}
if (connection != null) {
closed = true;
connection.failed(new IOException("[" + response.status + " " + response.reason + "]"));
}
}
});
return null;
}
return new HttpContentReceiver() {
@Override
public void received(ByteBuffer buffer) {
SendCallback sendCallback = new SendCallback() {
@Override
public void failed(IOException e) {
sender.cancel();
}
@Override
public void sent() {
}
};
while (buffer.hasRemaining()) {
if (!opcodeRead && buffer.hasRemaining()) {
int v = buffer.get() & 0xFF;
if ((v & 0x80) != 0x80) {
LOGGER.error("Current implementation handles only FIN packets");
sender.cancel();
connection.closed();
return;
}
currentOpcode = v & 0x0F;
opcodeRead = true;
}
if (!lenRead && buffer.hasRemaining()) {
int v = buffer.get() & 0xFF;
int len = v & 0x7F;
if (len <= 125) {
currentLen = len;
mustReadExtendedLen16 = false;
mustReadExtendedLen64 = false;
} else if (len == 126) {
mustReadExtendedLen16 = true;
mustReadExtendedLen64 = false;
currentExtendedLenBuffer = ByteBuffer.allocate(2);
} else {
mustReadExtendedLen64 = true;
mustReadExtendedLen16 = false;
currentExtendedLenBuffer = ByteBuffer.allocate(8);
}
mustReadMask = ((v & 0x80) == 0x80);
if (mustReadMask) {
currentMask = new byte[4];
currentMaskBuffer = ByteBuffer.wrap(currentMask);
currentPosInMask = 0;
}
lenRead = true;
}
while (mustReadExtendedLen16 && buffer.hasRemaining()) {
int v = buffer.get();
currentExtendedLenBuffer.put((byte) v);
if (currentExtendedLenBuffer.position() == 2) {
currentExtendedLenBuffer.flip();
currentLen = currentExtendedLenBuffer.getShort() & 0xFFFF;
mustReadExtendedLen16 = false;
currentExtendedLenBuffer = null;
}
}
while (mustReadExtendedLen64 && buffer.hasRemaining()) {
int v = buffer.get();
currentExtendedLenBuffer.put((byte) v);
if (currentExtendedLenBuffer.position() == 8) {
currentExtendedLenBuffer.flip();
currentLen = currentExtendedLenBuffer.getLong();
mustReadExtendedLen64 = false;
currentExtendedLenBuffer = null;
}
}
while (mustReadMask && buffer.hasRemaining()) {
int v = buffer.get();
currentMaskBuffer.put((byte) v);
if (currentMaskBuffer.position() == 4) {
currentMaskBuffer = null;
mustReadMask = false;
}
}
if (opcodeRead && lenRead && !mustReadExtendedLen16 && !mustReadExtendedLen64 && !mustReadMask && buffer.hasRemaining() && (currentRead < currentLen)) {
final ByteBuffer partialBuffer;
int len = (int) Math.min(buffer.remaining(), currentLen - currentRead);
if (currentMask == null) {
partialBuffer = buffer.duplicate();
partialBuffer.limit(partialBuffer.position() + len);
buffer.position(buffer.position() + len);
currentRead += len;
} else {
partialBuffer = ByteBuffer.allocate(len);
while (buffer.hasRemaining() && (currentRead < currentLen)) {
int v = buffer.get() & 0xFF;
v ^= currentMask[currentPosInMask];
partialBuffer.put((byte) v);
currentRead++;
currentPosInMask = (currentPosInMask + 1) % 4;
}
partialBuffer.flip();
}
int opcode = currentOpcode;
long frameLength = currentLen;
if (currentRead == currentLen) {
opcodeRead = false;
lenRead = false;
mustReadExtendedLen16 = false;
mustReadExtendedLen64 = false;
currentExtendedLenBuffer = null;
mustReadMask = false;
currentMaskBuffer = null;
currentMask = null;
currentRead = 0L;
}
if (opcode == 0x09) {
if (toPing == 0L) {
toPing = frameLength;
sender.send(WebsocketUtils.headerOf(0x0A, frameLength), sendCallback);
}
toPing -= partialBuffer.remaining();
sender.send(partialBuffer, sendCallback);
} else if ((opcode == 0x01) || (opcode == 0x02)) {
queue.execute(new Runnable() {
@Override
public void run() {
if (closed) {
return;
}
if (connection != null) {
connection.received(null, partialBuffer);
}
}
});
} else if (opcode == 0x08) {
LOGGER.debug("Connection closed by peer");
sender.cancel();
queue.execute(new Runnable() {
@Override
public void run() {
if (closed) {
return;
}
if (connection != null) {
closed = true;
connection.closed();
}
}
});
break;
}
}
}
}
@Override
public void ended() {
LOGGER.debug("Connection abruptly closed by peer");
sender.cancel();
queue.execute(new Runnable() {
@Override
public void run() {
if (closed) {
return;
}
if (connection != null) {
closed = true;
connection.closed();
}
}
});
}
};
}
@Override
public void failed(final IOException ioe) {
queue.execute(new Runnable() {
@Override
public void run() {
if (closed) {
return;
}
if (connection != null) {
closed = true;
connection.failed(ioe);
}
}
});
}
});
}
@Override